Annotation of multiplexer/dispatch.c, revision 1.4
1.1 oskar 1: /*
2: * ISO 13818 stream multiplexer
3: * Copyright (C) 2001 Convergence Integrated Media GmbH Berlin
4: * Author: Oskar Schirmer (oskar@convergence.de)
5: */
6:
7: /*
8: * Module: Dispatch
9: * Purpose: Main dispatching loop.
10: *
11: * The first two buffer stages (raw file data and input stream data)
12: * are filled gready, the following two stages (from input stream to
13: * output data buffer, and further to stdout) are timing controlled.
14: */
15:
16: #include "global.h"
17: #include "error.h"
18: #include "splice.h"
19: #include "input.h"
20: #include "output.h"
21: #include "command.h"
22: #include "dispatch.h"
23:
24: boolean fatal_error;
25: boolean force_quit;
26: boolean busy_work;
27:
28: boolean dispatch_init (void)
29: {
30: fatal_error = FALSE;
31: force_quit = FALSE;
32: busy_work = FALSE;
33: return (TRUE);
34: }
35:
1.3 oskar 36: /* Dispatch work to the modules as needed.
37: * Mainly, check a few internal conditions (buffer space,
1.4 ! oskar 38: * data availability), check the corresponding files with
1.3 oskar 39: * poll for readiness, then do something at the appropriate
40: * points to push data forward.
41: * Data is pushed thru the multiplexer unidirectionally:
42: * input --> f.rawdata --> s.pesdata --> o.spliceddata --> output
43: * The main loop continues as long as there is something to do.
44: * Thereafter, the last two stages may generate a finish and
45: * another loop tries to pump the buffers out.
46: */
1.1 oskar 47: void dispatch (void)
48: {
49: boolean bi, bo, bs;
50: stream_descr *st;
1.2 oskar 51: t_msec tmo;
1.1 oskar 52: unsigned int nfds, onfds, infds;
53: struct pollfd ufds [MAX_POLLFD];
54: warn (LDEB,"Dispatch",EDIS,0,0,0);
55: bs = FALSE;
56: st = input_available ();
57: nfds = 0;
58: command_expected (&nfds, &ufds[0]);
59: onfds = nfds;
60: bo = output_available (&nfds, &ufds[onfds], &tmo);
61: while ((bo
62: || bs
63: || (st != NULL)
64: || input_expected ()
65: || ((tmo >= 0) && (tmo <= MAX_MSEC_OUTDELAY))
66: || busy_work)
67: && (!fatal_error)
68: && (!force_quit)) {
69: infds = nfds;
70: bi = input_acceptable (&nfds, &ufds[infds], &tmo, output_acceptable ());
1.4 ! oskar 71: if ((bs)
! 72: || ((st != NULL) && output_acceptable ())) {
1.1 oskar 73: tmo = 0;
74: }
75: warn (LDEB,"Poll",EDIS,0,1,tmo);
76: #ifdef DEBUG_TIMEPOLL
77: ltp->tmo = tmo;
78: #endif
79: if (!timed_io) {
80: if (tmo > 0) {
81: global_delta += tmo;
82: tmo = 0;
83: warn (LDEB,"Global Delta",EDIS,0,3,global_delta);
84: #ifdef DEBUG_TIMEPOLL
85: ltp->flags |= LTP_FLAG_DELTASHIFT;
86: #endif
87: }
88: }
89: #ifdef DEBUG_TIMEPOLL
90: if (ltp->usec != 0) {
91: struct timeval tv;
92: gettimeofday (&tv,NULL);
93: ltp->usec -= tv.tv_usec;
94: }
95: ltp->nfdso = infds - onfds;
96: ltp->nfdsi = nfds - infds;
97: ltp->flags |=
98: (bo ? LTP_FLAG_OUTPUT : 0) |
99: (bi ? LTP_FLAG_INPUT : 0) |
100: (bs ? LTP_FLAG_SPLIT : 0) |
101: (st != NULL ? LTP_FLAG_PROCESS : 0);
102: ltp->sr = deb_inraw_free (0);
103: ltp->si = deb_instr_free (1);
104: ltp->so = output_free ();
105: ltp->nfdsrevent =
106: #endif
107: poll (&ufds[0], nfds, tmo);
108: #ifdef DEBUG_TIMEPOLL
109: if (logtpc < (max_timepoll-1)) {
110: struct timeval tv;
111: logtpc += 1;
112: ltp++;
113: gettimeofday (&tv,NULL);
114: ltp->usec = tv.tv_usec;
115: }
116: #endif
117: warn (LDEB,"Poll done",EDIS,0,2,nfds);
118: if ((0 < onfds)
119: && (ufds[0].revents & (POLLIN | POLLHUP | POLLERR))) {
120: command_process (ufds[0].revents & POLLIN);
121: }
122: if (bo
123: && (ufds[onfds].revents & (POLLOUT | POLLHUP | POLLERR))) {
124: output_something (ufds[onfds].revents & POLLOUT);
125: }
126: if (bi) {
127: while (infds < nfds) {
128: if (ufds[infds].revents & (POLLIN | POLLHUP | POLLERR)) {
129: input_something (input_filehandle (ufds[infds].fd),
130: ufds[infds].revents & POLLIN);
131: bi = FALSE;
132: bs = TRUE;
133: }
134: infds += 1;
135: }
136: if (!bi) {
137: if (st == NULL) {
138: st = input_available ();
139: }
140: }
141: }
142: if (bs) {
143: bs = split_something ();
144: }
145: if ((st != NULL) && output_acceptable ()) {
146: st = process_something (st);
147: bs = TRUE;
148: }
149: if (st == NULL) {
150: st = input_available ();
151: }
152: nfds = 0;
153: command_expected (&nfds, &ufds[0]);
154: onfds = nfds;
155: bo = output_available (&nfds, &ufds[onfds], &tmo);
156: }
157: process_finish ();
1.2 oskar 158: output_finish ();
1.1 oskar 159: while ((output_available (&nfds, &ufds[0], &tmo)
160: || (tmo >= 0))
161: && (!fatal_error)) {
162: output_something (TRUE);
163: }
164: #ifdef DEBUG_TIMEPOLL
165: {
166: int i, u, s;
167: i = 0;
168: s = 0;
169: fprintf (stderr,"lines: %8d\n",(int)logtpc);
170: while (i < logtpc) {
171: u = (logtp[i].usec > 0 ? 1000000 : 0) - logtp[i].usec;
172: s += u;
173: fprintf (stderr,
174: "%08d %10d.%06d:%8d (%6d) %c%5d %d %d/%d %c%c%c%c %d (F:%6d,S:%6d,O:%6d)\n",
175: i,
176: (int)logtp[i].tv.tv_sec,
177: (int)logtp[i].tv.tv_usec,
178: logtp[i].msec_now,
179: u,
180: logtp[i].flags & LTP_FLAG_DELTASHIFT ? 'D' : ' ',
181: logtp[i].tmo,
182: logtp[i].cnt_msecnow,
183: logtp[i].nfdsi,
184: logtp[i].nfdso,
185: logtp[i].flags & LTP_FLAG_INPUT ? 'I' : ' ',
186: logtp[i].flags & LTP_FLAG_SPLIT ? 'S' : ' ',
187: logtp[i].flags & LTP_FLAG_PROCESS ? 'P' : ' ',
188: logtp[i].flags & LTP_FLAG_OUTPUT ? 'O' : ' ',
189: logtp[i].nfdsrevent,
190: logtp[i].sr,
191: logtp[i].si,
192: logtp[i].so
193: );
194: i += 1;
195: }
196: fprintf (stderr,"%43d\n",s);
197: }
198: #endif
199: }
200:
LinuxTV legacy CVS <linuxtv.org/cvs>